package drds.data_propagate.parse.multistage_coordinator;

import com.lmax.disruptor.*;
import drds.data_propagate.binlog_event.protogenesis.BinLogEvent;
import drds.data_propagate.binlog_event.protogenesis.BinLogEventsContext;
import drds.data_propagate.binlog_event.protogenesis.Buffer;
import drds.data_propagate.binlog_event.protogenesis.event.binlog_management.FormatDescriptionEvent;
import drds.data_propagate.common.AbstractLifeCycle;
import drds.data_propagate.common.utils.NamedThreadFactory;
import drds.data_propagate.driver.packets.GtidSet;
import drds.data_propagate.parse.*;
import drds.data_propagate.parse.exception.ParseException;
import lombok.Getter;
import lombok.Setter;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;

/**
 * 针对解析器提供一个多阶段协同的处理
 *
 * <pre>
 * 1. 网络接收 (单线程)
 * 2. 事件基本解析 (单线程，事件类型、DDL解析构造TableMeta、维护位点信息)
 * 3. 事件深度解析 (多线程, DML事件数据的完整解析)
 * 4. 投递到store (单线程)
 * </pre>
 */
public class MultistageCoordinatorImpl extends AbstractLifeCycle implements MultistageCoordinator {

    private static final int maxFullTimes = 10;
    @Setter
    @Getter
    public BinlogEventConvertToEntry binlogEventConvertToEntry;
    @Setter
    @Getter
    public EventList eventList;
    @Setter
    @Getter
    public Dumper dumper;
    @Setter
    @Getter
    public volatile ParseException exception;
    @Setter
    @Getter
    public ExceptionHandler exceptionHandler = new ExceptionHandlerImpl();
    @Setter
    @Getter
    public GtidSet gtidSet;
    @Setter
    @Getter
    private int dmlParseStageHandlerThreadCount;
    @Setter
    @Getter
    private int ringBufferSize;
    @Setter
    @Getter
    private RingBuffer<MessageEvent> messageEventRingBuffer;
    @Setter
    @Getter
    private ExecutorService dmlParseStageHandlerExecutorService;
    @Setter
    @Getter
    private ExecutorService multistageExecutorService;
    @Setter
    @Getter
    private String destination;
    @Setter
    @Getter
    private AtomicLong eventsPublishBlockingTime;
    @Setter
    @Getter
    private WorkerPool<MessageEvent> dmlParseStageWorkerPool;
    @Setter
    @Getter
    private BatchEventProcessor<MessageEvent> parseStageProcessor;
    @Setter
    @Getter
    private BatchEventProcessor<MessageEvent> sinkStoreStageProcessor;
    @Setter
    @Getter
    private BinLogEventsContext binLogEventsContext;

    public MultistageCoordinatorImpl(int ringBufferSize, int dmlParseStageHandlerThreadCount, BinlogEventConvertToEntry binlogEventConvertToEntry, EventList eventList, String destination) {
        this.ringBufferSize = ringBufferSize;
        this.dmlParseStageHandlerThreadCount = dmlParseStageHandlerThreadCount;
        this.binlogEventConvertToEntry = binlogEventConvertToEntry;
        this.eventList = eventList;
        this.destination = destination;
    }

    @Override
    public void start() {
        super.start();
        this.exception = null;
        //
        this.binLogEventsContext = new BinLogEventsContext();
        this.messageEventRingBuffer = RingBuffer.createSingleProducer(new MessageEventFactory(), ringBufferSize, new BlockingWaitStrategy());

        // parseStage[单线程]
        SequenceBarrier parseStageSequenceBarrier = messageEventRingBuffer.newBarrier();
        parseStageProcessor = new BatchEventProcessor<MessageEvent>(messageEventRingBuffer, parseStageSequenceBarrier, new ParseStageHandler(this, binLogEventsContext));
        parseStageProcessor.setExceptionHandler(exceptionHandler);
        messageEventRingBuffer.addGatingSequences(parseStageProcessor.getSequence());

        // dmlParseStage[多线程]
        SequenceBarrier dmlParseStageSequenceBarrier = messageEventRingBuffer.newBarrier(parseStageProcessor.getSequence());
        int dmlParseStageHandlerThreadCount = this.dmlParseStageHandlerThreadCount > 0 ? this.dmlParseStageHandlerThreadCount : 1;
        this.dmlParseStageHandlerExecutorService = Executors.newFixedThreadPool(dmlParseStageHandlerThreadCount, new NamedThreadFactory("MultistageCoordinator-Parser", destination));
        WorkHandler<MessageEvent>[] dmlParseStageHandlers = new DmlParseStageHandler[dmlParseStageHandlerThreadCount];
        for (int i = 0; i < dmlParseStageHandlerThreadCount; i++) {
            dmlParseStageHandlers[i] = new DmlParseStageHandler(this);
        }
        dmlParseStageWorkerPool = new WorkerPool<MessageEvent>(messageEventRingBuffer, dmlParseStageSequenceBarrier, exceptionHandler, dmlParseStageHandlers);
        Sequence[] dmlParseStageSequences = dmlParseStageWorkerPool.getWorkerSequences();
        messageEventRingBuffer.addGatingSequences(dmlParseStageSequences);
        // sinkStoreStage[单线程]
        SequenceBarrier sinkStoreStageSequenceBarrier = messageEventRingBuffer.newBarrier(dmlParseStageSequences);
        sinkStoreStageProcessor = new BatchEventProcessor<MessageEvent>(messageEventRingBuffer, sinkStoreStageSequenceBarrier, new SinkStoreStageHandler(this));
        sinkStoreStageProcessor.setExceptionHandler(exceptionHandler);
        messageEventRingBuffer.addGatingSequences(sinkStoreStageProcessor.getSequence());
        // start
        this.multistageExecutorService = Executors.newFixedThreadPool(2, new NamedThreadFactory("MultistageCoordinator-other", destination));
        multistageExecutorService.submit(parseStageProcessor);
        dmlParseStageWorkerPool.start(dmlParseStageHandlerExecutorService);
        multistageExecutorService.submit(sinkStoreStageProcessor);

    }

    public void setBinlogChecksum(int binlogChecksum) {
        if (binlogChecksum != BinLogEvent.binlog_checksum_alg_off) {
            binLogEventsContext.setFormatDescriptionEvent(new FormatDescriptionEvent(4, binlogChecksum));
        }
    }

    @Override
    public void stop() {
        parseStageProcessor.halt();
        dmlParseStageWorkerPool.halt();
        sinkStoreStageProcessor.halt();
        try {
            dmlParseStageHandlerExecutorService.shutdownNow();
            while (!dmlParseStageHandlerExecutorService.awaitTermination(1, TimeUnit.SECONDS)) {
                if (dmlParseStageHandlerExecutorService.isShutdown() || dmlParseStageHandlerExecutorService.isTerminated()) {
                    break;
                }
                dmlParseStageHandlerExecutorService.shutdownNow();
            }
        } catch (Throwable e) {
            // ignore
        }

        try {
            multistageExecutorService.shutdownNow();
            while (!multistageExecutorService.awaitTermination(1, TimeUnit.SECONDS)) {
                if (multistageExecutorService.isShutdown() || multistageExecutorService.isTerminated()) {
                    break;
                }
                multistageExecutorService.shutdownNow();
            }
        } catch (Throwable e) {
            // ignore
        }
        super.stop();
    }

    /**
     * 网络数据投递
     */
    public boolean publish(Buffer buffer) {
        return this.publish(buffer, null);
    }

    public boolean publish(BinLogEvent binLogEvent) {
        return this.publish(null, binLogEvent);
    }

    private boolean publish(Buffer buffer, BinLogEvent binLogEvent) {
        if (!isStart()) {
            if (exception != null) {
                throw exception;
            }
            return false;
        }
        /**
         * 由于改为processor仅终止自身stage而不是stop，那么需要由incident标识coprocessor是否正常工作。
         * 让dump线程能够及时感知
         */
        if (exception != null) {
            throw exception;
        }
        boolean interupted = false;
        long blockingStart = 0L;
        int fullTimes = 0;
        do {
            try {
                long nextIndex = messageEventRingBuffer.tryNext();
                MessageEvent messageEvent = messageEventRingBuffer.get(nextIndex);
                if (buffer != null) {
                    messageEvent.setBuffer(buffer);
                } else {
                    messageEvent.setBinLogEvent(binLogEvent);
                }
                messageEventRingBuffer.publish(nextIndex);
                if (fullTimes > 0) {
                    eventsPublishBlockingTime.addAndGet(System.nanoTime() - blockingStart);
                }
                break;
            } catch (InsufficientCapacityException e) {
                if (fullTimes == 0) {
                    blockingStart = System.nanoTime();
                }
                // park
                // LockSupport.parkNanos(1L);
                applyWait(++fullTimes);
                interupted = Thread.interrupted();
                if (fullTimes % 1000 == 0) {
                    long nextStart = System.nanoTime();
                    eventsPublishBlockingTime.addAndGet(nextStart - blockingStart);
                    blockingStart = nextStart;
                }
            }
        } while (!interupted && isStart());
        return isStart();
    }

    // 处理无数据的情况，避免空循环挂死
    private void applyWait(int fullTimes) {
        int newFullTimes = fullTimes > maxFullTimes ? maxFullTimes : fullTimes;
        if (fullTimes <= 3) { // 3次以内
            Thread.yield();
        } else { // 超过3次，最多只sleep 1ms
            LockSupport.parkNanos(100 * 1000L * newFullTimes);
        }

    }


}
